home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Resources / Chat & Communication / Digsby build 37 / digsby_setup.exe / lib / pyxmpp / streambase.pyo (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2008-10-13  |  19.6 KB  |  750 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. __revision__ = '$Id: streambase.py 652 2006-08-27 19:41:15Z jajcus $'
  5. __docformat__ = 'restructuredtext en'
  6. import libxml2
  7. import socket
  8. import os
  9. import time
  10. import random
  11. import threading
  12. import errno
  13. import logging
  14. from pyxmpp import xmlextra
  15. from pyxmpp.expdict import ExpiringDictionary
  16. from pyxmpp.utils import to_utf8
  17. from pyxmpp.stanza import Stanza
  18. from pyxmpp.error import StreamErrorNode
  19. from pyxmpp.iq import Iq
  20. from pyxmpp.presence import Presence
  21. from pyxmpp.message import Message
  22. from pyxmpp.jid import JID
  23. from pyxmpp import resolver
  24. from pyxmpp.stanzaprocessor import StanzaProcessor
  25. from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError
  26. from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError
  27. STREAM_NS = 'http://etherx.jabber.org/streams'
  28. BIND_NS = 'urn:ietf:params:xml:ns:xmpp-bind'
  29.  
  30. def stanza_factory(xmlnode, stream = None):
  31.     if xmlnode.name == 'iq':
  32.         return Iq(xmlnode, stream = stream)
  33.     
  34.     if xmlnode.name == 'message':
  35.         return Message(xmlnode, stream = stream)
  36.     
  37.     if xmlnode.name == 'presence':
  38.         return Presence(xmlnode, stream = stream)
  39.     else:
  40.         return Stanza(xmlnode, stream = stream)
  41.  
  42.  
  43. class StreamBase(StanzaProcessor, xmlextra.StreamHandler):
  44.     
  45.     def __init__(self, default_ns, extra_ns = (), keepalive = 0, owner = None):
  46.         StanzaProcessor.__init__(self)
  47.         xmlextra.StreamHandler.__init__(self)
  48.         self.default_ns_uri = default_ns
  49.         if extra_ns:
  50.             self.extra_ns_uris = extra_ns
  51.         else:
  52.             self.extra_ns_uris = []
  53.         self.keepalive = keepalive
  54.         self._reader_lock = threading.Lock()
  55.         self.process_all_stanzas = False
  56.         self.port = None
  57.         self._reset()
  58.         self.owner = owner
  59.         self._StreamBase__logger = logging.getLogger('pyxmpp.Stream')
  60.  
  61.     
  62.     def _reset(self):
  63.         self.doc_in = None
  64.         self.doc_out = None
  65.         self.socket = None
  66.         self._reader = None
  67.         self.addr = None
  68.         self.default_ns = None
  69.         self.extra_ns = { }
  70.         self.stream_ns = None
  71.         self._reader = None
  72.         self.ioreader = None
  73.         self.me = None
  74.         self.peer = None
  75.         self.skip = False
  76.         self.stream_id = None
  77.         self._iq_response_handlers = ExpiringDictionary()
  78.         self._iq_get_handlers = { }
  79.         self._iq_set_handlers = { }
  80.         self._message_handlers = []
  81.         self._presence_handlers = []
  82.         self.eof = False
  83.         self.initiator = None
  84.         self.features = None
  85.         self.authenticated = False
  86.         self.peer_authenticated = False
  87.         self.auth_method_used = None
  88.         self.version = None
  89.         self.last_keepalive = False
  90.  
  91.     
  92.     def _connect_socket(self, sock, to = None):
  93.         self.eof = 0
  94.         self.socket = sock
  95.         if to:
  96.             self.peer = JID(to)
  97.         else:
  98.             self.peer = None
  99.         self.initiator = 1
  100.         self._send_stream_start()
  101.         self._make_reader()
  102.  
  103.     
  104.     def connect(self, addr, port, service = None, to = None):
  105.         self.lock.acquire()
  106.         
  107.         try:
  108.             return self._connect(addr, port, service, to)
  109.         finally:
  110.             self.lock.release()
  111.  
  112.  
  113.     
  114.     def _connect(self, addr, port, service = None, to = None):
  115.         if to is None:
  116.             to = str(addr)
  117.         
  118.         if service is not None:
  119.             self.state_change('resolving srv', (addr, service))
  120.             addrs = resolver.resolve_srv(addr, service)
  121.             if not addrs:
  122.                 addrs = [
  123.                     (addr, port)]
  124.             
  125.         else:
  126.             addrs = [
  127.                 (addr, port)]
  128.         msg = None
  129.         for addr, port in addrs:
  130.             if type(addr) in (str, unicode):
  131.                 self.state_change('resolving', addr)
  132.             
  133.             s = None
  134.             for res in resolver.getaddrinfo(addr, port, 0, socket.SOCK_STREAM):
  135.                 (family, socktype, proto, _unused, sockaddr) = res
  136.                 
  137.                 try:
  138.                     s = socket.socket(family, socktype, proto)
  139.                     self.state_change('connecting', sockaddr)
  140.                     s.connect(sockaddr)
  141.                     self.state_change('connected', sockaddr)
  142.                 except socket.error:
  143.                     msg = None
  144.                     self._StreamBase__logger.debug('Connect to %r failed' % (sockaddr,))
  145.                     if s:
  146.                         s.close()
  147.                         s = None
  148.                         continue
  149.                     continue
  150.  
  151.             
  152.             if s:
  153.                 break
  154.                 continue
  155.         
  156.         if not s:
  157.             if msg:
  158.                 raise socket.error, msg
  159.             else:
  160.                 raise FatalStreamError, 'Cannot connect'
  161.         
  162.         self.addr = addr
  163.         self.port = port
  164.         self._connect_socket(s, to)
  165.         self.last_keepalive = time.time()
  166.  
  167.     
  168.     def accept(self, sock, myname):
  169.         self.lock.acquire()
  170.         
  171.         try:
  172.             return self._accept(sock, myname)
  173.         finally:
  174.             self.lock.release()
  175.  
  176.  
  177.     
  178.     def _accept(self, sock, myname):
  179.         self.eof = 0
  180.         (self.socket, addr) = sock.accept()
  181.         self._StreamBase__logger.debug('Connection from: %r' % (addr,))
  182.         (self.addr, self.port) = addr
  183.         if myname:
  184.             self.me = JID(myname)
  185.         else:
  186.             self.me = None
  187.         self.initiator = 0
  188.         self._make_reader()
  189.         self.last_keepalive = time.time()
  190.  
  191.     
  192.     def disconnect(self):
  193.         self.lock.acquire()
  194.         
  195.         try:
  196.             return self._disconnect()
  197.         finally:
  198.             self.lock.release()
  199.  
  200.  
  201.     
  202.     def _disconnect(self):
  203.         if self.doc_out:
  204.             self._send_stream_end()
  205.         
  206.  
  207.     
  208.     def _post_connect(self):
  209.         pass
  210.  
  211.     
  212.     def _post_auth(self):
  213.         pass
  214.  
  215.     
  216.     def state_change(self, state, arg):
  217.         self._StreamBase__logger.debug('State: %s: %r' % (state, arg))
  218.  
  219.     
  220.     def close(self):
  221.         self.lock.acquire()
  222.         
  223.         try:
  224.             return self._close()
  225.         finally:
  226.             self.lock.release()
  227.  
  228.  
  229.     
  230.     def _close(self):
  231.         self._disconnect()
  232.         if self.doc_in:
  233.             self.doc_in = None
  234.         
  235.         if self.features:
  236.             self.features = None
  237.         
  238.         self._reader = None
  239.         self.stream_id = None
  240.         if self.socket:
  241.             self.socket.close()
  242.         
  243.         self._reset()
  244.  
  245.     
  246.     def _make_reader(self):
  247.         self._reader = xmlextra.StreamReader(self)
  248.  
  249.     
  250.     def stream_start(self, doc):
  251.         self.doc_in = doc
  252.         self._StreamBase__logger.debug('input document: %r' % (self.doc_in.serialize(),))
  253.         
  254.         try:
  255.             r = self.doc_in.getRootElement()
  256.             if r.ns().getContent() != STREAM_NS:
  257.                 self._send_stream_error('invalid-namespace')
  258.                 raise FatalStreamError, 'Invalid namespace.'
  259.         except libxml2.treeError:
  260.             self._send_stream_error('invalid-namespace')
  261.             raise FatalStreamError, "Couldn't get the namespace."
  262.  
  263.         self.version = r.prop('version')
  264.         if self.version and self.version != '1.0':
  265.             self._send_stream_error('unsupported-version')
  266.             raise FatalStreamError, 'Unsupported protocol version.'
  267.         
  268.         to_from_mismatch = 0
  269.         if self.initiator:
  270.             self.stream_id = r.prop('id')
  271.             peer = r.prop('from')
  272.             if peer:
  273.                 peer = JID(peer)
  274.             
  275.             if self.peer:
  276.                 if peer and peer != self.peer:
  277.                     self._StreamBase__logger.debug('peer hostname mismatch: %r != %r' % (peer, self.peer))
  278.                     to_from_mismatch = 1
  279.                 
  280.             else:
  281.                 self.peer = peer
  282.         else:
  283.             to = r.prop('to')
  284.             if to:
  285.                 to = self.check_to(to)
  286.                 if not to:
  287.                     self._send_stream_error('host-unknown')
  288.                     raise FatalStreamError, 'Bad "to"'
  289.                 
  290.                 self.me = JID(to)
  291.             
  292.             self._send_stream_start(self.generate_id())
  293.             self._send_stream_features()
  294.             self.state_change('fully connected', self.peer)
  295.             self._post_connect()
  296.         if not self.version:
  297.             self.state_change('fully connected', self.peer)
  298.             self._post_connect()
  299.         
  300.         if to_from_mismatch:
  301.             raise HostMismatch
  302.         
  303.  
  304.     
  305.     def stream_end(self, _unused):
  306.         self._StreamBase__logger.debug('Stream ended')
  307.         self.eof = 1
  308.         if self.doc_out:
  309.             self._send_stream_end()
  310.         
  311.         if self.doc_in:
  312.             self.doc_in = None
  313.             self._reader = None
  314.             if self.features:
  315.                 self.features = None
  316.             
  317.         
  318.         self.state_change('disconnected', self.peer)
  319.  
  320.     
  321.     def stanza_start(self, doc, node):
  322.         pass
  323.  
  324.     
  325.     def stanza(self, _unused, node):
  326.         self._process_node(node)
  327.  
  328.     
  329.     def error(self, descr):
  330.         raise StreamParseError, descr
  331.  
  332.     
  333.     def _send_stream_end(self):
  334.         self.doc_out.getRootElement().addContent(' ')
  335.         s = self.doc_out.getRootElement().serialize(encoding = 'UTF-8')
  336.         end = s.rindex('<')
  337.         
  338.         try:
  339.             self._write_raw(s[end:])
  340.         except (IOError, SystemError, socket.error):
  341.             e = None
  342.             self._StreamBase__logger.debug('Sending stream closing tag failed:' + str(e))
  343.  
  344.         self.doc_out.freeDoc()
  345.         self.doc_out = None
  346.         if self.features:
  347.             self.features = None
  348.         
  349.  
  350.     
  351.     def _send_stream_start(self, sid = None):
  352.         if self.doc_out:
  353.             raise StreamError, 'Stream start already sent'
  354.         
  355.         self.doc_out = libxml2.newDoc('1.0')
  356.         root = self.doc_out.newChild(None, 'stream', None)
  357.         self.stream_ns = root.newNs(STREAM_NS, 'stream')
  358.         root.setNs(self.stream_ns)
  359.         self.default_ns = root.newNs(self.default_ns_uri, None)
  360.         for prefix, uri in self.extra_ns:
  361.             self.extra_ns[uri] = root.newNs(uri, prefix)
  362.         
  363.         if self.peer and self.initiator:
  364.             root.setProp('to', self.peer.as_utf8())
  365.         
  366.         if self.me and not (self.initiator):
  367.             root.setProp('from', self.me.as_utf8())
  368.         
  369.         root.setProp('version', '1.0')
  370.         if sid:
  371.             root.setProp('id', sid)
  372.             self.stream_id = sid
  373.         
  374.         sr = self.doc_out.serialize(encoding = 'UTF-8')
  375.         self._write_raw(sr[:sr.find('/>')] + '>')
  376.  
  377.     
  378.     def _send_stream_error(self, condition):
  379.         if not self.doc_out:
  380.             self._send_stream_start()
  381.         
  382.         e = StreamErrorNode(condition)
  383.         e.xmlnode.setNs(self.stream_ns)
  384.         self._write_raw(e.serialize())
  385.         e.free()
  386.         self._send_stream_end()
  387.  
  388.     
  389.     def _restart_stream(self):
  390.         self._reader = None
  391.         self.doc_out = None
  392.         self.doc_in = None
  393.         self.features = None
  394.         if self.initiator:
  395.             self._send_stream_start(self.stream_id)
  396.         
  397.         self._make_reader()
  398.  
  399.     
  400.     def _make_stream_features(self):
  401.         root = self.doc_out.getRootElement()
  402.         features = root.newChild(root.ns(), 'features', None)
  403.         return features
  404.  
  405.     
  406.     def _send_stream_features(self):
  407.         self.features = self._make_stream_features()
  408.         self._write_raw(self.features.serialize(encoding = 'UTF-8'))
  409.  
  410.     
  411.     def write_raw(self, data):
  412.         self.lock.acquire()
  413.         
  414.         try:
  415.             return self._write_raw(data)
  416.         finally:
  417.             self.lock.release()
  418.  
  419.  
  420.     
  421.     def _write_raw(self, data):
  422.         logging.getLogger('pyxmpp.Stream.out').debug('OUT: %r', data)
  423.         
  424.         try:
  425.             self.socket.send(data)
  426.         except (IOError, OSError, socket.error):
  427.             e = None
  428.             raise FatalStreamError('IO Error: ' + str(e))
  429.  
  430.  
  431.     
  432.     def _write_node(self, xmlnode):
  433.         if self.eof and not (self.socket) or not (self.doc_out):
  434.             self._StreamBase__logger.debug('Dropping stanza: %r' % (xmlnode,))
  435.             return None
  436.         
  437.         xmlnode = xmlnode.docCopyNode(self.doc_out, 1)
  438.         self.doc_out.addChild(xmlnode)
  439.         
  440.         try:
  441.             ns = xmlnode.ns()
  442.         except libxml2.treeError:
  443.             ns = None
  444.  
  445.         if ns and ns.content == xmlextra.COMMON_NS:
  446.             xmlextra.replace_ns(xmlnode, ns, self.default_ns)
  447.         
  448.         s = xmlextra.safe_serialize(xmlnode)
  449.         self._write_raw(s)
  450.         xmlnode.unlinkNode()
  451.         xmlnode.freeNode()
  452.  
  453.     
  454.     def send(self, stanza):
  455.         self.lock.acquire()
  456.         
  457.         try:
  458.             return self._send(stanza)
  459.         finally:
  460.             self.lock.release()
  461.  
  462.  
  463.     
  464.     def _send(self, stanza):
  465.         if not self.version:
  466.             
  467.             try:
  468.                 err = stanza.get_error()
  469.             except ProtocolError:
  470.                 err = None
  471.  
  472.             if err:
  473.                 err.downgrade()
  474.             
  475.         
  476.         self.fix_out_stanza(stanza)
  477.         self._write_node(stanza.xmlnode)
  478.  
  479.     
  480.     def idle(self):
  481.         self.lock.acquire()
  482.         
  483.         try:
  484.             return self._idle()
  485.         finally:
  486.             self.lock.release()
  487.  
  488.  
  489.     
  490.     def _idle(self):
  491.         self._iq_response_handlers.expire()
  492.         if not (self.socket) or self.eof:
  493.             return None
  494.         
  495.         now = time.time()
  496.         if self.keepalive and now - self.last_keepalive >= self.keepalive:
  497.             self._write_raw(' ')
  498.             self.last_keepalive = now
  499.         
  500.  
  501.     
  502.     def fileno(self):
  503.         self.lock.acquire()
  504.         
  505.         try:
  506.             return self.socket.fileno()
  507.         finally:
  508.             self.lock.release()
  509.  
  510.  
  511.     
  512.     def loop(self, timeout):
  513.         self.lock.acquire()
  514.         
  515.         try:
  516.             while not (self.eof) and self.socket is not None:
  517.                 act = self._loop_iter(timeout)
  518.                 if not act:
  519.                     self._idle()
  520.                     continue
  521.         finally:
  522.             self.lock.release()
  523.  
  524.  
  525.     
  526.     def loop_iter(self, timeout):
  527.         self.lock.acquire()
  528.         
  529.         try:
  530.             return self._loop_iter(timeout)
  531.         finally:
  532.             self.lock.release()
  533.  
  534.  
  535.     
  536.     def _loop_iter(self, timeout):
  537.         import select
  538.         self.lock.release()
  539.         
  540.         try:
  541.             (ifd, _unused, efd) = select.select([
  542.                 self.socket], [], [
  543.                 self.socket], timeout)
  544.         except select.error:
  545.             e = None
  546.             if e.args[0] != errno.EINTR:
  547.                 raise 
  548.             
  549.             ifd = []
  550.             _unused = []
  551.             efd = []
  552.         finally:
  553.             self.lock.acquire()
  554.  
  555.         if self.socket in ifd or self.socket in efd:
  556.             self._process()
  557.             return True
  558.         else:
  559.             return False
  560.  
  561.     
  562.     def process(self):
  563.         self.lock.acquire()
  564.         
  565.         try:
  566.             self._process()
  567.         finally:
  568.             self.lock.release()
  569.  
  570.  
  571.     
  572.     def _process(self):
  573.         
  574.         try:
  575.             
  576.             try:
  577.                 self._read()
  578.             except (xmlextra.error,):
  579.                 e = None
  580.                 self._StreamBase__logger.exception('Exception during read()')
  581.                 raise StreamParseError(unicode(e))
  582.             except:
  583.                 raise 
  584.  
  585.         except (IOError, OSError, socket.error):
  586.             e = None
  587.             self.close()
  588.             raise FatalStreamError('IO Error: ' + str(e))
  589.         except (FatalStreamError, KeyboardInterrupt, SystemExit):
  590.             e = None
  591.             self.close()
  592.             raise 
  593.  
  594.  
  595.     
  596.     def _read(self):
  597.         self._StreamBase__logger.debug('StreamBase._read(), socket: %r', self.socket)
  598.         if self.eof:
  599.             return None
  600.         
  601.         
  602.         try:
  603.             r = self.socket.recv(1024)
  604.         except socket.error:
  605.             e = None
  606.             if e.args[0] != errno.EINTR:
  607.                 raise 
  608.             
  609.             return None
  610.  
  611.         self._feed_reader(r)
  612.  
  613.     
  614.     def _feed_reader(self, data):
  615.         logging.getLogger('pyxmpp.Stream.in').debug('IN: %r', data)
  616.         if data:
  617.             
  618.             try:
  619.                 r = self._reader.feed(data)
  620.                 while r:
  621.                     r = self._reader.feed('')
  622.                 if r is None:
  623.                     self.eof = 1
  624.                     self.disconnect()
  625.             except StreamParseError:
  626.                 self._send_stream_error('xml-not-well-formed')
  627.                 raise 
  628.             except:
  629.                 None<EXCEPTION MATCH>StreamParseError
  630.             
  631.  
  632.         None<EXCEPTION MATCH>StreamParseError
  633.         self.eof = 1
  634.         self.disconnect()
  635.         if self.eof:
  636.             self.stream_end(None)
  637.         
  638.  
  639.     
  640.     def _process_node(self, xmlnode):
  641.         ns_uri = xmlnode.ns().getContent()
  642.         if ns_uri == 'http://etherx.jabber.org/streams':
  643.             self._process_stream_node(xmlnode)
  644.             return None
  645.         
  646.         if ns_uri == self.default_ns_uri:
  647.             stanza = stanza_factory(xmlnode, self)
  648.             self.lock.release()
  649.             
  650.             try:
  651.                 self.process_stanza(stanza)
  652.             finally:
  653.                 self.lock.acquire()
  654.                 stanza.free()
  655.  
  656.         else:
  657.             self._StreamBase__logger.debug('Unhandled node: %r' % (xmlnode.serialize(),))
  658.  
  659.     
  660.     def _process_stream_node(self, xmlnode):
  661.         if xmlnode.name == 'error':
  662.             e = StreamErrorNode(xmlnode)
  663.             self.lock.release()
  664.             
  665.             try:
  666.                 self.process_stream_error(e)
  667.             finally:
  668.                 self.lock.acquire()
  669.                 e.free()
  670.  
  671.             return None
  672.         elif xmlnode.name == 'features':
  673.             self._StreamBase__logger.debug('Got stream features')
  674.             self._StreamBase__logger.debug('Node: %r' % (xmlnode,))
  675.             self.features = xmlnode.copyNode(1)
  676.             self.doc_in.addChild(self.features)
  677.             self._got_features()
  678.             return None
  679.         
  680.         self._StreamBase__logger.debug('Unhandled stream node: %r' % (xmlnode.serialize(),))
  681.  
  682.     
  683.     def process_stream_error(self, err):
  684.         self._StreamBase__logger.debug('Unhandled stream error: condition: %s %r' % (err.get_condition().name, err.serialize()))
  685.  
  686.     
  687.     def check_to(self, to):
  688.         if to != self.me:
  689.             return None
  690.         
  691.         return to
  692.  
  693.     
  694.     def generate_id(self):
  695.         return '%i-%i-%s' % (os.getpid(), time.time(), str(random.random())[2:])
  696.  
  697.     
  698.     def _got_features(self):
  699.         ctxt = self.doc_in.xpathNewContext()
  700.         ctxt.setContextNode(self.features)
  701.         ctxt.xpathRegisterNs('stream', STREAM_NS)
  702.         ctxt.xpathRegisterNs('bind', BIND_NS)
  703.         bind_n = None
  704.         
  705.         try:
  706.             bind_n = ctxt.xpathEval('bind:bind')
  707.         finally:
  708.             ctxt.xpathFreeContext()
  709.  
  710.         if self.authenticated:
  711.             if bind_n:
  712.                 self.bind(self.me.resource)
  713.             else:
  714.                 self.state_change('authorized', self.me)
  715.         
  716.  
  717.     
  718.     def bind(self, resource):
  719.         iq = Iq(stanza_type = 'set')
  720.         q = iq.new_query(BIND_NS, u'bind')
  721.         if resource:
  722.             q.newTextChild(None, 'resource', to_utf8(resource))
  723.         
  724.         self.state_change('binding', resource)
  725.         self.set_response_handlers(iq, self._bind_success, self._bind_error)
  726.         self.send(iq)
  727.         iq.free()
  728.  
  729.     
  730.     def _bind_success(self, stanza):
  731.         jid_n = stanza.xpath_eval('bind:bind/bind:jid', {
  732.             'bind': BIND_NS })
  733.         if jid_n:
  734.             self.me = JID(jid_n[0].getContent().decode('utf-8'))
  735.         
  736.         self.state_change('authorized', self.me)
  737.  
  738.     
  739.     def _bind_error(self, stanza):
  740.         raise FatalStreamError, 'Resource binding failed'
  741.  
  742.     
  743.     def connected(self):
  744.         if self.doc_in and self.doc_out and not (self.eof):
  745.             return True
  746.         else:
  747.             return False
  748.  
  749.  
  750.